---
title: 追踪系统 | 可观测性 | Kastrax 文档
description: Kastrax 追踪系统的详细介绍，包括追踪架构、追踪 API 和追踪集成的实现和使用方法。
---

# 追踪系统 ✅

Kastrax 追踪系统提供了一套全面的工具和方法，用于监控和分析 AI 代理的行为和性能。本指南详细介绍了追踪系统的架构、组件和使用方法。

## 什么是追踪系统？ ✅

追踪系统是一个框架，用于：

- 记录 AI 代理的行为和决策过程
- 监控代理的性能和资源使用情况
- 分析代理的调用链和依赖关系
- 诊断问题和优化性能
- 提供可视化和报告功能

通过追踪系统，您可以深入了解代理的内部工作原理，并在出现问题时快速定位和解决。

## 追踪系统架构 ✅

Kastrax 追踪系统由以下核心组件组成：

1. **追踪 API**：用于创建和管理追踪
2. **追踪收集器**：收集和处理追踪数据
3. **追踪存储**：存储追踪数据
4. **追踪分析器**：分析追踪数据并生成报告
5. **追踪可视化**：可视化追踪数据

## 基本追踪用法 ✅

以下是使用 Kastrax 追踪系统的简单示例：

```kotlin
import ai.kastrax.observability.tracing.Tracer
import ai.kastrax.observability.tracing.Span
import ai.kastrax.core.agent.agent
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // 初始化追踪器
    val tracer = Tracer.create("my-application")
    
    // 创建根追踪
    val rootSpan = tracer.startSpan("main-process")
    
    try {
        // 创建代理
        rootSpan.addEvent("创建代理")
        val myAgent = agent {
            name("追踪测试代理")
            description("用于追踪的测试代理")
            
            model = deepSeek {
                apiKey("your-deepseek-api-key")
                model(DeepSeekModel.DEEPSEEK_CHAT)
                temperature(0.7)
            }
            
            // 启用追踪
            tracing {
                enabled = true
                tracer = tracer
            }
        }
        
        // 创建子追踪
        val generateSpan = tracer.startSpan("generate-response", parent = rootSpan)
        
        try {
            // 添加追踪属性
            generateSpan.setAttribute("query", "什么是人工智能？")
            
            // 生成响应
            val response = myAgent.generate("什么是人工智能？")
            
            // 添加响应信息
            generateSpan.setAttribute("response_length", response.text.length)
            generateSpan.setAttribute("token_usage", response.usage?.totalTokens ?: 0)
            
            println("响应: ${response.text}")
        } finally {
            // 结束子追踪
            generateSpan.end()
        }
        
        // 添加事件
        rootSpan.addEvent("处理完成")
    } catch (e: Exception) {
        // 记录错误
        rootSpan.recordException(e)
        println("发生错误: ${e.message}")
    } finally {
        // 结束根追踪
        rootSpan.end()
    }
    
    // 关闭追踪器
    tracer.close()
}
```

## 追踪 API ✅

### 创建追踪器

```kotlin
// 创建基本追踪器
val tracer = Tracer.create("my-application")

// 创建带配置的追踪器
val configuredTracer = Tracer.create("my-application") {
    sampler = Sampler.ALWAYS_ON
    propagator = W3CPropagator.getInstance()
    exporter = JaegerExporter {
        endpoint = "http://localhost:14250"
    }
}
```

### 创建追踪

```kotlin
// 创建根追踪
val rootSpan = tracer.startSpan("main-process")

// 创建子追踪
val childSpan = tracer.startSpan("sub-process", parent = rootSpan)

// 创建带属性的追踪
val attributedSpan = tracer.startSpan("process-with-attributes") {
    setAttribute("user_id", "user-123")
    setAttribute("request_id", UUID.randomUUID().toString())
    setAttribute("priority", 1)
}
```

### 追踪操作

```kotlin
// 添加属性
span.setAttribute("key", "value")
span.setAttribute("count", 42)
span.setAttribute("enabled", true)

// 添加事件
span.addEvent("开始处理")
span.addEvent("处理完成", mapOf("duration_ms" to 150))

// 记录异常
try {
    // 可能抛出异常的代码
} catch (e: Exception) {
    span.recordException(e)
}

// 设置状态
span.setStatus(StatusCode.OK, "处理成功")
span.setStatus(StatusCode.ERROR, "处理失败: 无效输入")

// 结束追踪
span.end()
```

### 上下文传播

```kotlin
// 获取当前追踪上下文
val context = tracer.currentContext()

// 在新协程中使用上下文
kotlinx.coroutines.withContext(context.asContextElement()) {
    // 在此协程中创建的追踪将自动关联到父追踪
    val span = tracer.startSpan("coroutine-process")
    // ...
    span.end()
}

// 跨服务传播
val headers = mutableMapOf<String, String>()
tracer.inject(context, headers) { carrier, key, value ->
    carrier[key] = value
}

// 在另一个服务中提取上下文
val extractedContext = tracer.extract(receivedHeaders) { carrier, key ->
    carrier[key]
}
```

## 追踪集成 ✅

### 代理追踪

```kotlin
// 创建带追踪的代理
val agent = agent {
    name("追踪代理")
    description("带追踪功能的代理")
    
    model = deepSeek {
        apiKey("your-deepseek-api-key")
        model(DeepSeekModel.DEEPSEEK_CHAT)
        temperature(0.7)
    }
    
    // 启用追踪
    tracing {
        enabled = true
        tracer = tracer
        
        // 配置追踪详细程度
        level = TracingLevel.DETAILED
        
        // 配置追踪属性
        attributes {
            include("model")
            include("temperature")
            include("max_tokens")
        }
        
        // 配置敏感数据处理
        sensitiveData {
            mask("api_key")
            mask("user_data")
        }
    }
}
```

### 工具追踪

```kotlin
// 创建带追踪的工具
val tracedTool = tool("calculator") {
    description("执行基本的数学计算")
    parameters {
        parameter("expression", "要计算的数学表达式", String::class)
    }
    
    // 启用追踪
    tracing {
        enabled = true
        tracer = tracer
    }
    
    execute { params ->
        // 创建追踪
        val span = tracer.startSpan("calculator-execution")
        
        try {
            val expression = params["expression"] as String
            span.setAttribute("expression", expression)
            
            // 执行计算
            val result = evalExpression(expression)
            span.setAttribute("result", result.toString())
            
            "计算结果: $result"
        } catch (e: Exception) {
            span.recordException(e)
            span.setStatus(StatusCode.ERROR, e.message ?: "计算错误")
            "无法计算表达式: ${e.message}"
        } finally {
            span.end()
        }
    }
}
```

### 记忆系统追踪

```kotlin
// 创建带追踪的记忆系统
val tracedMemory = memory {
    type = MemoryType.VECTOR
    
    // 启用追踪
    tracing {
        enabled = true
        tracer = tracer
    }
}

// 使用记忆系统
val span = tracer.startSpan("memory-operation")
try {
    // 添加记忆
    memory.add("用户喜欢蓝色")
    
    // 检索记忆
    val relevantMemories = memory.retrieve("用户喜欢什么颜色？")
    span.setAttribute("memory_count", relevantMemories.size)
    
    println("相关记忆: $relevantMemories")
} finally {
    span.end()
}
```

### RAG 系统追踪

```kotlin
// 创建带追踪的 RAG 系统
val tracedRAG = RAG(
    vectorStore = vectorStore,
    embeddingService = embeddingService,
    tracer = tracer
)

// 使用 RAG 系统
val span = tracer.startSpan("rag-search")
try {
    // 设置查询
    val query = "什么是量子计算？"
    span.setAttribute("query", query)
    
    // 执行搜索
    val results = tracedRAG.search(query, limit = 5)
    span.setAttribute("result_count", results.size)
    
    println("找到 ${results.size} 个相关文档")
} finally {
    span.end()
}
```

## 追踪导出器 ✅

Kastrax 支持多种追踪导出器，用于将追踪数据发送到不同的后端系统：

### 控制台导出器

```kotlin
// 创建控制台导出器
val consoleExporter = ConsoleSpanExporter()

// 配置追踪器使用控制台导出器
val tracer = Tracer.create("my-application") {
    exporter = consoleExporter
}
```

### Jaeger 导出器

```kotlin
// 创建 Jaeger 导出器
val jaegerExporter = JaegerExporter {
    endpoint = "http://localhost:14250"
    username = "user"
    password = "pass"
}

// 配置追踪器使用 Jaeger 导出器
val tracer = Tracer.create("my-application") {
    exporter = jaegerExporter
}
```

### Zipkin 导出器

```kotlin
// 创建 Zipkin 导出器
val zipkinExporter = ZipkinExporter {
    endpoint = "http://localhost:9411/api/v2/spans"
}

// 配置追踪器使用 Zipkin 导出器
val tracer = Tracer.create("my-application") {
    exporter = zipkinExporter
}
```

### OTLP 导出器

```kotlin
// 创建 OTLP 导出器
val otlpExporter = OtlpExporter {
    endpoint = "http://localhost:4317"
    protocol = OtlpProtocol.GRPC
}

// 配置追踪器使用 OTLP 导出器
val tracer = Tracer.create("my-application") {
    exporter = otlpExporter
}
```

### 自定义导出器

```kotlin
// 创建自定义导出器
class CustomExporter : SpanExporter {
    override fun export(spans: List<SpanData>): CompletableResultCode {
        // 实现自定义导出逻辑
        for (span in spans) {
            // 处理追踪数据
            println("导出追踪: ${span.name}")
            // 可以将数据发送到自定义后端
        }
        return CompletableResultCode.ofSuccess()
    }
    
    override fun flush(): CompletableResultCode {
        // 实现刷新逻辑
        return CompletableResultCode.ofSuccess()
    }
    
    override fun shutdown(): CompletableResultCode {
        // 实现关闭逻辑
        return CompletableResultCode.ofSuccess()
    }
}

// 配置追踪器使用自定义导出器
val tracer = Tracer.create("my-application") {
    exporter = CustomExporter()
}
```

## 追踪采样器 ✅

采样器决定哪些追踪应该被记录和导出：

```kotlin
// 始终采样
val alwaysOnSampler = Sampler.ALWAYS_ON

// 从不采样
val alwaysOffSampler = Sampler.ALWAYS_OFF

// 概率采样（采样 50% 的追踪）
val probabilitySampler = Sampler.probability(0.5)

// 基于速率的采样（每秒最多 10 个追踪）
val rateLimitingSampler = Sampler.rateLimit(10)

// 配置追踪器使用采样器
val tracer = Tracer.create("my-application") {
    sampler = probabilitySampler
}
```

## 追踪可视化 ✅

Kastrax 提供了多种方式来可视化追踪数据：

### Jaeger UI

```kotlin
// 配置 Jaeger 导出器
val tracer = Tracer.create("my-application") {
    exporter = JaegerExporter {
        endpoint = "http://localhost:14250"
    }
}

// 然后可以在 Jaeger UI 中查看追踪数据
// 通常在 http://localhost:16686
```

### Zipkin UI

```kotlin
// 配置 Zipkin 导出器
val tracer = Tracer.create("my-application") {
    exporter = ZipkinExporter {
        endpoint = "http://localhost:9411/api/v2/spans"
    }
}

// 然后可以在 Zipkin UI 中查看追踪数据
// 通常在 http://localhost:9411
```

### 自定义可视化

```kotlin
// 创建自定义可视化
class TracingDashboard(private val tracer: Tracer) {
    fun generateReport(timeRange: TimeRange): TracingReport {
        // 获取指定时间范围内的追踪数据
        val spans = tracer.getSpans(timeRange)
        
        // 生成报告
        return TracingReport(
            spans = spans,
            summary = generateSummary(spans),
            charts = generateCharts(spans)
        )
    }
    
    private fun generateSummary(spans: List<SpanData>): TracingSummary {
        // 生成摘要统计信息
        val totalSpans = spans.size
        val errorSpans = spans.count { it.status.isError }
        val avgDuration = spans.map { it.endTime - it.startTime }.average()
        
        return TracingSummary(
            totalSpans = totalSpans,
            errorSpans = errorSpans,
            avgDuration = avgDuration
        )
    }
    
    private fun generateCharts(spans: List<SpanData>): List<Chart> {
        // 生成可视化图表
        val charts = mutableListOf<Chart>()
        
        // 示例：生成持续时间分布图
        charts.add(
            HistogramChart(
                title = "追踪持续时间分布",
                data = spans.map { it.endTime - it.startTime }
            )
        )
        
        // 示例：生成错误率图
        charts.add(
            PieChart(
                title = "追踪状态分布",
                data = mapOf(
                    "成功" to spans.count { !it.status.isError },
                    "错误" to spans.count { it.status.isError }
                )
            )
        )
        
        return charts
    }
}

// 使用自定义可视化
val dashboard = TracingDashboard(tracer)
val report = dashboard.generateReport(
    TimeRange(
        start = System.currentTimeMillis() - 24 * 60 * 60 * 1000, // 24 小时前
        end = System.currentTimeMillis()
    )
)

// 显示报告
println("追踪报告:")
println("总追踪数: ${report.summary.totalSpans}")
println("错误追踪数: ${report.summary.errorSpans}")
println("平均持续时间: ${report.summary.avgDuration} ms")

// 保存报告
report.saveToFile("tracing_report.html")
```

## 追踪最佳实践 ✅

1. **命名约定**：使用一致的命名约定，如 `operation_name`
2. **适当粒度**：选择合适的追踪粒度，既不要太细也不要太粗
3. **关键属性**：添加有助于诊断的关键属性，如用户 ID、请求 ID 等
4. **错误处理**：始终记录异常和错误状态
5. **性能考虑**：注意追踪对性能的影响，使用采样器减少开销
6. **敏感数据**：避免在追踪中包含敏感数据，或使用掩码处理
7. **上下文传播**：在分布式系统中正确传播追踪上下文
8. **定期审查**：定期审查追踪数据，识别性能问题和优化机会

## 完整示例 ✅

以下是一个完整的追踪示例，包括代理、工具和 RAG 系统：

```kotlin
import ai.kastrax.observability.tracing.Tracer
import ai.kastrax.observability.tracing.Span
import ai.kastrax.core.agent.agent
import ai.kastrax.core.tools.tool
import ai.kastrax.rag.RAG
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // 初始化追踪器
    val tracer = Tracer.create("ai-assistant-app") {
        exporter = JaegerExporter {
            endpoint = "http://localhost:14250"
        }
        sampler = Sampler.ALWAYS_ON
    }
    
    // 创建根追踪
    val rootSpan = tracer.startSpan("main-application")
    
    try {
        // 创建 RAG 系统
        rootSpan.addEvent("初始化 RAG 系统")
        val rag = RAG(
            vectorStore = vectorStore,
            embeddingService = embeddingService,
            tracer = tracer
        )
        
        // 添加文档
        val addDocsSpan = tracer.startSpan("add-documents", parent = rootSpan)
        try {
            val documents = loadDocuments("knowledge_base/")
            addDocsSpan.setAttribute("document_count", documents.size)
            
            rag.addDocuments(documents)
        } finally {
            addDocsSpan.end()
        }
        
        // 创建工具
        rootSpan.addEvent("创建工具")
        val searchTool = tool("search") {
            description("搜索知识库")
            parameters {
                parameter("query", "搜索查询", String::class)
                parameter("limit", "结果数量限制", Int::class, required = false, defaultValue = 5)
            }
            
            // 启用追踪
            tracing {
                enabled = true
                tracer = tracer
            }
            
            execute { params ->
                val span = tracer.startSpan("search-tool-execution")
                
                try {
                    val query = params["query"] as String
                    val limit = params["limit"] as? Int ?: 5
                    
                    span.setAttribute("query", query)
                    span.setAttribute("limit", limit)
                    
                    // 执行搜索
                    val results = rag.search(query, limit = limit)
                    span.setAttribute("result_count", results.size)
                    
                    // 格式化结果
                    val formattedResults = results.mapIndexed { index, result ->
                        "${index + 1}. ${result.document.content.take(200)}..."
                    }.joinToString("\n\n")
                    
                    "搜索结果:\n\n$formattedResults"
                } catch (e: Exception) {
                    span.recordException(e)
                    span.setStatus(StatusCode.ERROR, e.message ?: "搜索错误")
                    "搜索失败: ${e.message}"
                } finally {
                    span.end()
                }
            }
        }
        
        // 创建代理
        rootSpan.addEvent("创建代理")
        val assistant = agent {
            name("知识助手")
            description("一个可以搜索知识库的助手")
            
            model = deepSeek {
                apiKey("your-deepseek-api-key")
                model(DeepSeekModel.DEEPSEEK_CHAT)
                temperature(0.7)
            }
            
            // 添加工具
            tools {
                tool(searchTool)
            }
            
            // 启用追踪
            tracing {
                enabled = true
                tracer = tracer
                level = TracingLevel.DETAILED
            }
        }
        
        // 处理用户查询
        val querySpan = tracer.startSpan("process-user-query", parent = rootSpan)
        try {
            val userQuery = "量子计算的基本原理是什么？"
            querySpan.setAttribute("user_query", userQuery)
            
            // 生成响应
            val response = assistant.generate(userQuery)
            querySpan.setAttribute("response_length", response.text.length)
            querySpan.setAttribute("token_usage", response.usage?.totalTokens ?: 0)
            
            // 记录工具使用情况
            response.toolCalls?.forEach { toolCall ->
                querySpan.addEvent("工具调用", mapOf(
                    "tool_name" to toolCall.name,
                    "tool_input" to toolCall.input.toString()
                ))
            }
            
            println("用户查询: $userQuery")
            println("助手响应: ${response.text}")
        } finally {
            querySpan.end()
        }
        
        // 添加最终事件
        rootSpan.addEvent("应用程序完成")
    } catch (e: Exception) {
        rootSpan.recordException(e)
        rootSpan.setStatus(StatusCode.ERROR, e.message ?: "应用程序错误")
        println("发生错误: ${e.message}")
    } finally {
        rootSpan.end()
    }
    
    // 关闭追踪器
    tracer.close()
    println("追踪数据已发送到 Jaeger")
}

// 加载文档的辅助函数
suspend fun loadDocuments(directory: String): List<Document> {
    val loader = DocumentLoader.fromDirectory(directory)
    return loader.load()
}
```

## 下一步 ✅

现在您已经了解了追踪系统，您可以：

1. 了解[日志系统](./logging.mdx)的详细用法
2. 探索[Next.js 追踪](./nextjs-tracing.mdx)的集成方法
3. 学习如何将追踪与其他可观测性工具结合使用
